-
-
Notifications
You must be signed in to change notification settings - Fork 867
Engine v1 improvements #1627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Engine v1 improvements #1627
Conversation
|
Warning There were issues while running some tools. Please review the errors and either fix the tool’s configuration or disable the tool if it’s a critical failure. 🔧 eslint
apps/webapp/app/env.server.tsOops! Something went wrong! :( ESLint: 8.45.0 ESLint couldn't find the config "custom" to extend from. Please check that the name of the config is correct. The config "custom" was referenced from the config file in "/.eslintrc.js". If you still have problems, please stop by https://eslint.org/chat/help to chat with the team. WalkthroughThis pull request introduces significant enhancements to the MarQS (Message Queue System) and related infrastructure, focusing on improving queue management, concurrency control, and error handling. The changes span multiple files across the application, introducing new configuration options, refactoring queue selection strategies, and adding more robust error tracking mechanisms. Key modifications include adding environment variables for queue consumer settings, updating queue selection logic, and implementing a maximum negative acknowledgment count for messages. Changes
Sequence DiagramsequenceDiagram
participant Client
participant MarQS
participant QueueConsumer
participant MessageQueue
Client->>MarQS: Enqueue Message
MarQS->>MessageQueue: Store Message
MarQS-->>Client: Message Enqueued
QueueConsumer->>MarQS: Dequeue Message
MarQS->>MessageQueue: Retrieve Message
MarQS->>QueueConsumer: Message Details
alt Message Processing Successful
QueueConsumer->>MarQS: Acknowledge Message
MarQS->>MessageQueue: Remove Message
else Message Processing Failed
QueueConsumer->>MarQS: Negative Acknowledge
MarQS->>MessageQueue: Check Nack Count
alt Nack Count Exceeded
MarQS->>MessageQueue: Permanently Fail Message
else Nack Count Within Limit
MarQS->>MessageQueue: Requeue Message
end
end
Possibly related PRs
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🔭 Outside diff range comments (1)
apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts (1)
Line range hint
100-105
: Fix incorrecttotalWeight
assignment overwriting calculated value.In the
#calculateQueueWeights
method, after calculatingtotalWeight
based on size and age, the return statement assignstotalWeight: age
, overwriting the calculatedtotalWeight
. This causes the weighting logic to be ineffective.Apply this diff to fix the issue:
return { queue, - totalWeight: age, + totalWeight: totalWeight, };
🧹 Nitpick comments (6)
references/v3-catalog/src/trigger/queues.ts (1)
34-42
: Consider making queue name configurable.The hardcoded queue name "named-queue" might limit reusability. Consider making it configurable through environment variables or task parameters.
apps/webapp/app/models/taskQueue.server.ts (1)
5-58
: Add error logging and improve type safety.The function could benefit from error logging when queue lookups fail, and explicit return type annotations.
Consider these improvements:
export async function findQueueInEnvironment( queueName: string, environmentId: string, backgroundWorkerTaskId?: string, backgroundTask?: { queueConfig?: unknown } -): Promise<TaskQueue | undefined> { +): Promise<TaskQueue | undefined> { const sanitizedQueueName = sanitizeQueueName(queueName); const queue = await prisma.taskQueue.findFirst({ where: { runtimeEnvironmentId: environmentId, name: sanitizedQueueName, }, }); if (queue) { return queue; } + + logger.debug("Queue not found with sanitized name", { + sanitizedQueueName, + environmentId, + });apps/webapp/app/v3/marqs/v2.server.ts (1)
85-85
: Consider making maximumNackCount configurable.The hardcoded value of 10 for
maximumNackCount
might need to be adjusted based on different environments or use cases. Consider making this configurable through environment variables, similar to other MarQS settings.- maximumNackCount: 10, + maximumNackCount: env.V2_MARQS_MAXIMUM_NACK_COUNT ?? 10,apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts (1)
Line range hint
705-767
: Consider refactoring duplicated OpenTelemetry configuration logic.The OpenTelemetry configuration logic is duplicated between
resolveBuiltInDevVariables
andresolveBuiltInProdVariables
. Consider extracting this into a shared helper function to improve maintainability.+function getOtelBatchConfig(config: { + enabled: string; + spanMaxExportBatchSize: string; + spanScheduledDelayMillis: string; + spanExportTimeoutMillis: string; + spanMaxQueueSize: string; + logMaxExportBatchSize: string; + logScheduledDelayMillis: string; + logExportTimeoutMillis: string; + logMaxQueueSize: string; +}): EnvironmentVariable[] { + if (config.enabled !== "1") return []; + return [ + { key: "OTEL_BATCH_PROCESSING_ENABLED", value: "1" }, + { key: "OTEL_SPAN_MAX_EXPORT_BATCH_SIZE", value: config.spanMaxExportBatchSize }, + { key: "OTEL_SPAN_SCHEDULED_DELAY_MILLIS", value: config.spanScheduledDelayMillis }, + { key: "OTEL_SPAN_EXPORT_TIMEOUT_MILLIS", value: config.spanExportTimeoutMillis }, + { key: "OTEL_SPAN_MAX_QUEUE_SIZE", value: config.spanMaxQueueSize }, + { key: "OTEL_LOG_MAX_EXPORT_BATCH_SIZE", value: config.logMaxExportBatchSize }, + { key: "OTEL_LOG_SCHEDULED_DELAY_MILLIS", value: config.logScheduledDelayMillis }, + { key: "OTEL_LOG_EXPORT_TIMEOUT_MILLIS", value: config.logExportTimeoutMillis }, + { key: "OTEL_LOG_MAX_QUEUE_SIZE", value: config.logMaxQueueSize }, + ]; +}Also applies to: 767-860
apps/webapp/app/env.server.ts (1)
165-166
: LGTM! Consider adding documentation for the new environment variables.The new environment variables for queue management and message handling look good. However, it would be helpful to add JSDoc comments explaining:
- The purpose and impact of each variable
- The recommended values for different scenarios
- The relationship between these variables (e.g., how
MARQS_SHARED_QUEUE_SELECTION_COUNT
relates toMARQS_DEV_QUEUE_SELECTION_COUNT
)Also applies to: 224-226
apps/webapp/app/v3/marqs/devQueueConsumer.server.ts (1)
436-441
: LGTM! Consider adding error handling for edge cases.The use of
findQueueInEnvironment
improves queue retrieval by encapsulating the logic and providing better context. However, consider adding error handling for cases where:
- The background task is undefined
- The queue name format is invalid
- The environment ID is invalid
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (19)
apps/webapp/app/env.server.ts
(2 hunks)apps/webapp/app/models/taskQueue.server.ts
(1 hunks)apps/webapp/app/routes/admin.api.v1.marqs.ts
(0 hunks)apps/webapp/app/services/apiAuth.server.ts
(2 hunks)apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts
(5 hunks)apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
(2 hunks)apps/webapp/app/v3/marqs/index.server.ts
(26 hunks)apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts
(1 hunks)apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
(31 hunks)apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts
(2 hunks)apps/webapp/app/v3/marqs/types.ts
(2 hunks)apps/webapp/app/v3/marqs/v2.server.ts
(1 hunks)apps/webapp/app/v3/models/workerDeployment.server.ts
(1 hunks)apps/webapp/app/v3/services/createBackgroundWorker.server.ts
(2 hunks)apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
(7 hunks)apps/webapp/app/v3/services/finalizeTaskRun.server.ts
(4 hunks)apps/webapp/app/v3/services/triggerTask.server.ts
(2 hunks)packages/core/src/v3/schemas/messages.ts
(4 hunks)references/v3-catalog/src/trigger/queues.ts
(1 hunks)
💤 Files with no reviewable changes (1)
- apps/webapp/app/routes/admin.api.v1.marqs.ts
✅ Files skipped from review due to trivial changes (2)
- apps/webapp/app/v3/services/triggerTask.server.ts
- apps/webapp/app/v3/services/createBackgroundWorker.server.ts
🧰 Additional context used
🪛 Biome (1.9.4)
apps/webapp/app/v3/marqs/index.server.ts
[error] 450-450: Unnecessary continue statement
Unsafe fix: Delete the unnecessary continue statement
(lint/correctness/noUnnecessaryContinue)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (buildjet-8vcpu-ubuntu-2204 - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (buildjet-8vcpu-ubuntu-2204 - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (16)
apps/webapp/app/v3/marqs/index.server.ts (1)
450-450
: Retain thecontinue
statement for correct loop control flow.Although the static analysis suggests that the
continue
statement is unnecessary, it is essential here. Thecontinue
ensures that after logging the error, the loop proceeds to the next iteration. Removing it would alter the loop behavior and potentially skip necessary steps.🧰 Tools
🪛 Biome (1.9.4)
[error] 450-450: Unnecessary continue statement
Unsafe fix: Delete the unnecessary continue statement
(lint/correctness/noUnnecessaryContinue)
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (2)
750-755
: Avoid logging potentially sensitive data in debug logs.In the debug log statement,
queueMessage: message
includes the entire message payload, which may contain sensitive information. To prevent potential PII leakage, avoid logging full message contents.[security/PII leakage issue]
Apply this diff to fix the issue:
if (!queue) { logger.debug("SharedQueueConsumer queue not found, so nacking message", { - queueMessage: message, + messageId: message.messageId, taskRunQueue: lockedTaskRun.queue, runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId, });
Line range hint
1049-1054
: Prevent PII leakage in logs by avoiding sensitive data.The debug log statement includes
queueMessage: message.data
, which may contain sensitive information. Logging such data can lead to PII leakage. It's advisable to log only necessary identifiers.[security/PII leakage issue]
Apply this diff to fix the issue:
logger.debug("SharedQueueConsumer queue not found, so nacking message", { - queueMessage: message.data, + messageId: message.messageId, queueName: sanitizeQueueName(resumableRun.queue), attempt: resumableAttempt, });references/v3-catalog/src/trigger/queues.ts (1)
3-25
: Add input validation for numberOfQueues parameter.The
numberOfQueues
parameter should be validated to ensure it's greater than 0 to prevent potential division by zero in the modulo operation.Consider adding validation:
export const queuesController = task({ id: "queues/controller", run: async ({ numberOfQueues = 20, length = 20, waitSeconds = 3, }: { numberOfQueues?: number; length?: number; waitSeconds?: number; }) => { + if (numberOfQueues <= 0) { + throw new Error("numberOfQueues must be greater than 0"); + } await queuesTest.batchTriggerAndWait(apps/webapp/app/v3/models/workerDeployment.server.ts (1)
47-47
: LGTM! Good addition for queue configuration.The addition of
queueConfig
to task selection aligns well with the PR's objectives for improved queue management and TaskQueue record resolution.apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts (1)
145-147
: LGTM! Well-structured key generation method.The
nackCounterKey
method follows the established key generation pattern and maintains consistency with other methods in the class.apps/webapp/app/v3/services/createTaskRunAttempt.server.ts (3)
99-100
: LGTM! Good variable extraction.Extracting
lockedBy
improves code readability and reduces repeated property access.Also applies to: 101-104
105-110
: LGTM! Improved queue lookup.Using
findQueueInEnvironment
centralizes queue lookup logic and provides better error handling.
125-128
: LGTM! Enhanced error messaging.The error message now provides better guidance based on worker capabilities.
apps/webapp/app/v3/services/finalizeTaskRun.server.ts (2)
295-295
: LGTM! Added queue configuration support.Including
queueConfig
in the worker task selection enables enhanced queue configuration capabilities.
307-312
: LGTM! Consistent queue lookup pattern.Using
findQueueInEnvironment
maintains consistency with the centralized queue lookup pattern across the codebase.apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts (3)
657-670
: Well-structured type definitions with improved type safety!The new
RuntimeEnvironmentForEnvRepoPayload
andRuntimeEnvironmentForEnvRepo
types provide better type safety by explicitly defining the required fields for runtime environment data.
674-676
: LGTM! Clear separation of variable resolution logic.The function effectively combines project secrets, overridable trigger variables, and environment-specific built-in variables.
Line range hint
705-724
: Verify fallback values for optional environment variables.The code uses optional chaining for several environment variables (e.g.,
env.DEV_OTEL_EXPORTER_OTLP_ENDPOINT ?? env.APP_ORIGIN
). Ensure that at least one of the fallback values is always defined to prevent undefined values.✅ Verification successful
Fallback chains are properly implemented
The environment variables are correctly configured with APP_ORIGIN (defaulting to "http://localhost:3030") serving as the guaranteed fallback value for all chains. The optional variables (API_ORIGIN, STREAM_ORIGIN, DEV_OTEL_EXPORTER_OTLP_ENDPOINT) are properly handled with the nullish coalescing operator, ensuring no undefined values can occur.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for environment variable definitions to verify fallback values rg -A 1 'APP_ORIGIN|API_ORIGIN|STREAM_ORIGIN' .env*Length of output: 345
Script:
#!/bin/bash # Search for environment variable validation and configuration ast-grep --pattern 'const env = $$$' rg -A 5 'process\.env\.' apps/webapp/app/ rg -A 5 'env\.' apps/webapp/app/config/Length of output: 25409
Script:
#!/bin/bash # Check environment variable schema validation rg -A 10 'APP_ORIGIN|API_ORIGIN|STREAM_ORIGIN|DEV_OTEL_EXPORTER_OTLP_ENDPOINT' apps/webapp/app/env.server.tsLength of output: 1578
apps/webapp/app/services/apiAuth.server.ts (1)
414-414
: Verify the impact of the type change on JWT token generation.The parameter type change from
RuntimeEnvironment
toRuntimeEnvironmentForEnvRepo
looks good, but let's ensure all required properties are available in the new type.Run the following script to verify the type compatibility:
✅ Verification successful
Type change is safe for JWT token generation
The
RuntimeEnvironmentForEnvRepo
type includes all required properties (id
,organizationId
, andprojectId
) used in JWT token generation. The type is actually more precise as it explicitly declares the available fields through a Prisma payload selection.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check if RuntimeEnvironmentForEnvRepo has all required properties used in JWT token generation # Test: Search for properties accessed on the environment parameter ast-grep --pattern 'environment.$_' # Test: Compare with properties available in RuntimeEnvironmentForEnvRepo ast-grep --pattern 'type RuntimeEnvironmentForEnvRepo = { $$$ }'Length of output: 52426
Script:
#!/bin/bash # Search for RuntimeEnvironmentForEnvRepo type definition with a more flexible pattern ast-grep --pattern 'type RuntimeEnvironmentForEnvRepo' # Get the content of the file where the type is used cat apps/webapp/app/services/apiAuth.server.tsLength of output: 14161
Script:
#!/bin/bash # Get the content of the file containing the type definition cat apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.tsLength of output: 23576
packages/core/src/v3/schemas/messages.ts (1)
Line range hint
19-34
: LGTM! Type safety improvements look good.The changes improve type safety and maintainability by:
- Following TypeScript naming conventions with the
AckCallbackResult
export- Making the type available for external use
- Ensuring consistent usage in
PlatformToProviderMessages
andPlatformToCoordinatorMessages
Also applies to: 292-292, 535-535
apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts
Show resolved
Hide resolved
@trigger.dev/core
@trigger.dev/build
@trigger.dev/rsc
@trigger.dev/sdk
trigger.dev
@trigger.dev/react-hooks
commit: |
9affe8e
to
55fb1f3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
apps/webapp/app/models/taskQueue.server.ts (1)
61-63
: 🛠️ Refactor suggestionAdd validation for empty queue names.
The sanitization function should handle empty string cases after sanitization.
export function sanitizeQueueName(queueName: string) { - return queueName.replace(/[^a-zA-Z0-9_\-\/]/g, "").substring(0, 128); + const sanitized = queueName.replace(/[^a-zA-Z0-9_\-\/]/g, "").substring(0, 128); + if (!sanitized) { + throw new Error("Queue name cannot be empty after sanitization"); + } + return sanitized; }
🧹 Nitpick comments (5)
references/v3-catalog/src/trigger/queues.ts (1)
3-25
: Add input validation for numeric parameters.The function accepts numeric parameters without validation. Consider adding checks for negative numbers and zero values.
export const queuesController = task({ id: "queues/controller", run: async ({ numberOfQueues = 20, length = 20, waitSeconds = 3, }: { numberOfQueues?: number; length?: number; waitSeconds?: number; }) => { + if (numberOfQueues <= 0) throw new Error("numberOfQueues must be positive"); + if (length <= 0) throw new Error("length must be positive"); + if (waitSeconds <= 0) throw new Error("waitSeconds must be positive"); await queuesTest.batchTriggerAndWait( Array.from({ length }, (_, i) => ({ payload: { waitSeconds }, options: { queue: { name: `queue-${i % numberOfQueues}`, }, }, })) ); }, });apps/webapp/app/models/taskQueue.server.ts (1)
5-58
: Add error handling and consider caching.The function performs multiple database operations without error handling. Additionally, for frequently accessed queues, consider implementing a caching mechanism.
export async function findQueueInEnvironment( queueName: string, environmentId: string, backgroundWorkerTaskId?: string, backgroundTask?: { queueConfig?: unknown } ): Promise<TaskQueue | undefined> { const sanitizedQueueName = sanitizeQueueName(queueName); + try { const queue = await prisma.taskQueue.findFirst({ where: { runtimeEnvironmentId: environmentId, name: sanitizedQueueName, }, }); if (queue) { return queue; } const task = backgroundTask ? backgroundTask : backgroundWorkerTaskId ? await prisma.backgroundWorkerTask.findFirst({ where: { id: backgroundWorkerTaskId, }, }) : undefined; if (!task) { return; } const queueConfig = QueueOptions.safeParse(task.queueConfig); if (queueConfig.success) { const taskQueueName = queueConfig.data.name ? sanitizeQueueName(queueConfig.data.name) : undefined; if (taskQueueName && taskQueueName !== sanitizedQueueName) { const queue = await prisma.taskQueue.findFirst({ where: { runtimeEnvironmentId: environmentId, name: taskQueueName, }, }); if (queue) { return queue; } } } + } catch (error) { + throw new Error(`Failed to find queue: ${error.message}`); + } }apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts (1)
51-89
: Extract magic numbers into named constants.The randomization logic uses magic numbers that should be extracted into named constants for better maintainability.
+const PROBABILITY_THRESHOLD = 0.2; // 20% threshold for maintaining weight order +const RANDOM_FACTOR_RANGE = 0.2; // ±10% random adjustment + distributeQueues(queues: QueueWithScores[]): Array<string> { // ... existing code ... const shuffledWeightedQueues = weightedQueues .map((queueInfo, index) => ({ ...queueInfo, // Add some controlled randomness while maintaining general weight order - randomFactor: Math.random() * 0.2 - 0.1, // ±10% random adjustment + randomFactor: Math.random() * RANDOM_FACTOR_RANGE - (RANDOM_FACTOR_RANGE / 2), originalIndex: index, })) .sort((a, b) => { // If probability difference is significant (>20%), maintain order - if (Math.abs(a.probability - b.probability) > 0.2) { + if (Math.abs(a.probability - b.probability) > PROBABILITY_THRESHOLD) { return b.probability - a.probability; } // Otherwise, allow some randomization while keeping similar weights roughly together return b.probability + b.randomFactor - (a.probability + a.randomFactor); })apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (2)
1092-1120
: Improve timeout handling for coordinator communication.The coordinator emitWithAck timeout is hardcoded to an environment variable. Consider:
- Adding a fallback timeout value
- Implementing exponential backoff for retries
- .timeout(env.SHARED_QUEUE_CONSUMER_EMIT_RESUME_DEPENDENCY_TIMEOUT_MS) + .timeout(env.SHARED_QUEUE_CONSUMER_EMIT_RESUME_DEPENDENCY_TIMEOUT_MS ?? 5000)
1514-1518
: Consider using a utility library for array chunking.The custom
chunk
function could be replaced with a well-tested utility library like Lodash's_.chunk
.-function chunk<T>(arr: T[], chunkSize: number): T[][] { - return Array.from({ length: Math.ceil(arr.length / chunkSize) }, (_, i) => - arr.slice(i * chunkSize, i * chunkSize + chunkSize) - ); -} +import { chunk } from 'lodash';
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (19)
apps/webapp/app/env.server.ts
(2 hunks)apps/webapp/app/models/taskQueue.server.ts
(1 hunks)apps/webapp/app/routes/admin.api.v1.marqs.ts
(0 hunks)apps/webapp/app/services/apiAuth.server.ts
(2 hunks)apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts
(5 hunks)apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
(2 hunks)apps/webapp/app/v3/marqs/index.server.ts
(26 hunks)apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts
(1 hunks)apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
(31 hunks)apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts
(2 hunks)apps/webapp/app/v3/marqs/types.ts
(2 hunks)apps/webapp/app/v3/marqs/v2.server.ts
(1 hunks)apps/webapp/app/v3/models/workerDeployment.server.ts
(1 hunks)apps/webapp/app/v3/services/createBackgroundWorker.server.ts
(2 hunks)apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
(7 hunks)apps/webapp/app/v3/services/finalizeTaskRun.server.ts
(4 hunks)apps/webapp/app/v3/services/triggerTask.server.ts
(2 hunks)packages/core/src/v3/schemas/messages.ts
(4 hunks)references/v3-catalog/src/trigger/queues.ts
(1 hunks)
💤 Files with no reviewable changes (1)
- apps/webapp/app/routes/admin.api.v1.marqs.ts
🚧 Files skipped from review as they are similar to previous changes (8)
- apps/webapp/app/v3/models/workerDeployment.server.ts
- apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts
- apps/webapp/app/v3/marqs/v2.server.ts
- apps/webapp/app/v3/services/triggerTask.server.ts
- apps/webapp/app/services/apiAuth.server.ts
- apps/webapp/app/v3/services/createBackgroundWorker.server.ts
- apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
- apps/webapp/app/env.server.ts
👮 Files not reviewed due to content moderation or server errors (5)
- apps/webapp/app/v3/services/finalizeTaskRun.server.ts
- apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
- packages/core/src/v3/schemas/messages.ts
- apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts
- apps/webapp/app/v3/marqs/index.server.ts
🧰 Additional context used
🪛 Biome (1.9.4)
apps/webapp/app/v3/marqs/index.server.ts
[error] 450-450: Unnecessary continue statement
Unsafe fix: Delete the unnecessary continue statement
(lint/correctness/noUnnecessaryContinue)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (buildjet-8vcpu-ubuntu-2204 - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (buildjet-8vcpu-ubuntu-2204 - npm)
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (11)
references/v3-catalog/src/trigger/queues.ts (2)
27-32
: LGTM!The implementation is clean and includes a sensible default for the wait duration.
34-42
: LGTM!The task is well-defined with a specific queue name and simple logging.
apps/webapp/app/v3/marqs/types.ts (2)
50-50
: LGTM!The method signature is clear and consistent with other key generation methods in the interface.
69-76
: Add JSDoc documentation for new methods.The new methods lack documentation explaining their purpose and parameters.
+ /** + * Distributes queues based on their scores and capacities + * @param queues Array of queues with their associated scores and capacities + * @returns Array of queue names in their distributed order + */ distributeQueues(queues: Array<QueueWithScores>): Array<string>; + /** + * Calculates the next range of queues to process + * @param parentQueue The parent queue that holds the candidate queues + * @param consumerId The consumer requesting the next range + * @param currentRange Current processing range + * @param queueSize Total size of the queue + * @returns Next range to process + */ moveToNextRange( parentQueue: string, consumerId: string, currentRange: QueueRange, queueSize: number ): QueueRange;apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts (2)
29-49
: LGTM!The implementation handles range progression and reset correctly with clear comments explaining the logic.
133-149
: LGTM!The no-op implementation correctly provides default values and empty results.
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (5)
12-16
: LGTM! New types enhance message handling capabilities.The addition of
AckCallbackResult
andQueueOptions
types improves type safety for queue operations.
132-139
: Enhanced error tracking with new outcome field.The addition of the
outcome
field inDoWorkInternalResult
improves error tracking and debugging capabilities by categorizing results into specific outcomes.
1416-1420
: LGTM! Efficient batch processing of completed attempts.The chunking of completed attempt IDs improves performance by processing them in batches.
1496-1512
: LGTM! Well-structured error handling classes.The new error classes provide clear error categorization and improve error handling capabilities.
750-755
: Verify queue resolution edge cases.The new queue resolution logic using
findQueueInEnvironment
could potentially return undefined. Consider adding retry logic or graceful degradation for transient failures.✅ Verification successful
Existing implementation already handles queue resolution edge cases appropriately
The code already implements proper error handling for queue resolution failures:
- Explicit null check with
if (!queue)
- Debug logging of the failure
- Graceful degradation by nacking the message for retry
This pattern is consistently implemented across all usages of
findQueueInEnvironment
in the codebase.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check for potential queue resolution failures rg -A 5 "findQueueInEnvironment.*undefined"Length of output: 45
Script:
#!/bin/bash # Find findQueueInEnvironment implementation ast-grep --pattern 'function findQueueInEnvironment($_) { $$$ }' # Check how queue is used after resolution rg -B 2 -A 10 "const queue = await findQueueInEnvironment" # Look for error handling patterns around queue resolution rg -B 5 -A 5 "try.*findQueueInEnvironment"Length of output: 5370
Summary by CodeRabbit
Release Notes
Environment Configuration
Queue Management
Task Processing
System Improvements
New Features
These changes focus on improving the reliability, performance, and flexibility of queue and task management within the system.